Skip to content

STT Evaluation: Refactor#601

Open
AkhileshNegi wants to merge 7 commits intomainfrom
refactor/stt-evaluation
Open

STT Evaluation: Refactor#601
AkhileshNegi wants to merge 7 commits intomainfrom
refactor/stt-evaluation

Conversation

@AkhileshNegi
Copy link
Collaborator

@AkhileshNegi AkhileshNegi commented Feb 12, 2026

Summary

Target issue is #PLEASE_TYPE_ISSUE_NUMBER
Explain the motivation for making this change. What existing problem does the pull request solve?

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

Notes

Please add here if any other information is required for the reviewer.

Summary by CodeRabbit

  • Refactor

    • STT evaluation submission now queues work as async background jobs for more scalable, responsive processing
    • Result processing optimized with bulk inserts for faster completion
  • Performance

    • Concurrent generation of signed audio URLs to speed batch preparation
  • Reliability

    • Improved logging and error handling for clearer run status and failure messages
  • Bug Fixes

    • Submission status now reports as "pending" more consistently

@AkhileshNegi AkhileshNegi self-assigned this Feb 12, 2026
@AkhileshNegi AkhileshNegi added the enhancement New feature or request label Feb 12, 2026
@coderabbitai
Copy link

coderabbitai bot commented Feb 12, 2026

📝 Walkthrough

Walkthrough

Refactors STT evaluation to enqueue batch submission as a Celery low-priority job instead of submitting synchronously; removes several CRUD helpers tied to per-sample result creation and updates; adds a Celery-executable batch_job service and shifts result persistence to bulk operations and concurrent signed-URL generation.

Changes

Cohort / File(s) Summary
API Route & Tests
backend/app/api/routes/stt_evaluations/evaluation.py, backend/app/tests/api/routes/test_stt_evaluation.py
Replaced synchronous batch submission with start_low_priority_job enqueue; added correlation_id trace retrieval; removed in-route per-sample result creation and DB refresh; tests updated to expect task ID and "pending" status.
Result Feedback API
backend/app/api/routes/stt_evaluations/result.py
Removed pre-check fetch of result before updating; now directly delegates to update_human_feedback.
CRUD surface removals
backend/app/crud/stt_evaluations/__init__.py, backend/app/crud/stt_evaluations/result.py, backend/app/crud/stt_evaluations/run.py
Removed exports and implementations for create_stt_results, update_stt_result, get_pending_results_for_run, and get_pending_stt_runs; API now relies on bulk-insert and other remaining CRUD ops.
Batch signing & submission
backend/app/crud/stt_evaluations/batch.py
Introduced concurrent signed URL generation via ThreadPoolExecutor with per-task error capture; removed in-loop commits and legacy error-updates; raises only if all signing fails; aggregates success/failure counts.
Cron / Polling & Result Processing
backend/app/crud/stt_evaluations/cron.py
Simplified per-project client init; changed polling paths to mark runs failed when no jobs; replaced per-result updates with constructing rows and batch_insert_mappings for bulk inserts; updated logging and success/failure counters.
Dataset queries
backend/app/crud/stt_evaluations/dataset.py
Removed STTSamplePublic import and made limit parameter required for get_samples_by_dataset_id (removed default).
Services: Batch Job & Audio/Dataset
backend/app/services/stt_evaluations/batch_job.py, backend/app/services/stt_evaluations/audio.py, backend/app/services/stt_evaluations/dataset.py
Added execute_batch_submission Celery-target function that loads run, fetches samples, and calls batch submission inside a session; audio upload now derives size from file metadata; logging reformatted to key-value style.

Sequence Diagram

sequenceDiagram
    participant Client
    participant API as API Route
    participant Queue as Celery Queue
    participant Worker as Batch Worker
    participant DB as Database
    participant Service as Batch Service

    Client->>API: POST start STT evaluation
    API->>DB: create STT run (pending)
    DB-->>API: run created
    API->>Queue: enqueue execute_batch_submission(run_id, dataset_id, org_id)
    Queue-->>API: returns task_id
    API-->>Client: 202 Accepted (run_id, status: pending, task_id)

    Queue->>Worker: deliver task
    Worker->>DB: fetch run by id/org/project
    Worker->>DB: fetch samples for dataset (limit = run.total_items)
    Worker->>Service: start_stt_evaluation_batch(run, samples)
    Service->>Service: generate signed URLs concurrently
    Service->>External: submit batch job to provider
    External-->>Service: batch response (keys/status)
    Service->>DB: bulk_insert STT results (on completion)
    Service->>DB: update run status (submitted/processing/failed)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

  • Evaluation: STT #571 — Adds the original STT evaluation CRUD and batch submission flow that this PR replaces/refactors.
  • Evaluation #405 — Implements batch job primitives and batch/Celery infrastructure leveraged by the new async workflow.
  • Evaluation: Refactor cron #594 — Related polling/refactoring changes to cron/polling logic for evaluations with overlapping concerns.

Suggested reviewers

  • kartpop
  • vprashrex
  • Prajna1999

Poem

🐇 Hopping queues where tasks take flight,

Celery whispers through the night,
Signed URLs scurry, concurrent and fleet,
Bulk rows land softly—results complete,
A rabbit cheers for async delight!

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 inconclusive)
Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'STT Evaluation: Refactor' is vague and generic; it only indicates that changes relate to STT evaluation without specifying the main nature or purpose of the refactor. Replace with a more descriptive title that captures the core refactor goal, such as 'STT Evaluation: Move batch submission to async Celery jobs' or similar, to better communicate the primary change to reviewers.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch refactor/stt-evaluation

No actionable comments were generated in the recent review. 🎉

🧹 Recent nitpick comments
backend/app/crud/stt_evaluations/batch.py (3)

87-122: Well-structured concurrent URL signing — minor type-safety gap for error

The ThreadPoolExecutor + as_completed pattern is clean and appropriate for this I/O-bound workload. The closure over read-only file_map, storage, and signed_url_expires_in is thread-safe.

One small inconsistency: on line 90, failed_samples is annotated as list[tuple[STTSample, str]], but the else branch on line 118 appends (sample, error) where error has type str | None (from the return type of _generate_signed_url). If url were falsy-but-not-None (e.g., an empty string — extremely unlikely from get_signed_url), error would be None, violating the annotation. Consider either guarding with error or "Unknown error" or widening the annotation.

Suggested guard
             else:
-                failed_samples.append((sample, error))
+                failed_samples.append((sample, error or "Unknown error"))

131-132: Bare Exception raises could be more specific

Lines 132 and 188 raise plain Exception. A domain-specific exception (or at least RuntimeError) would let callers distinguish STT evaluation failures from unexpected errors and avoid overly broad except Exception handlers upstream.

Also applies to: 187-188


31-38: Return type dict[str, Any] is broad — consider a TypedDict or dataclass.

The return value on line 204-209 has a well-defined shape (success, run_id, batch_jobs, sample_count). A TypedDict would give callers type-safe access and make the contract self-documenting. Not blocking, just a suggestion for when the interface stabilises.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Feb 12, 2026

Codecov Report

❌ Patch coverage is 20.96774% with 49 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
backend/app/crud/stt_evaluations/cron.py 6.45% 29 Missing ⚠️
backend/app/crud/stt_evaluations/batch.py 5.00% 19 Missing ⚠️
...ckend/app/api/routes/stt_evaluations/evaluation.py 80.00% 1 Missing ⚠️

📢 Thoughts on this report? Let us know!

@AkhileshNegi AkhileshNegi marked this pull request as ready for review February 12, 2026 16:48
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
backend/app/crud/stt_evaluations/cron.py (2)

118-118: ⚠️ Potential issue | 🟡 Minor

Dead code path: "processed" action is never returned by poll_stt_run.

poll_stt_run returns action values "completed", "failed", or "no_change" — never "processed". The "processed" check here is unreachable, which means the intent might be wrong or this is leftover from a prior version.

Suggested fix
-                    if result["action"] in ("completed", "processed"):
+                    if result["action"] == "completed":

354-359: 🛠️ Refactor suggestion | 🟠 Major

batch_job parameter typed as Any — should be BatchJob.

The coding guidelines require type hints on all function parameters. batch_job is used as a BatchJob throughout (accessing .id, .config, .provider_batch_id), so the type annotation should reflect that.

Suggested fix
 async def process_completed_stt_batch(
     session: Session,
     run: EvaluationRun,
-    batch_job: Any,
+    batch_job: BatchJob,
     batch_provider: GeminiBatchProvider,
 ) -> None:

As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".

🤖 Fix all issues with AI agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py`:
- Around line 108-111: The HTTPException currently returns internal exception
text via detail=f"Failed to queue batch submission: {e}"; change this to a
generic message (e.g. detail="Failed to queue batch submission") so internal
error details are not leaked to clients, keep the status_code=500, and ensure
the original exception is still logged (the existing logger call earlier in the
surrounding function in evaluation.py should remain unchanged); locate the raise
HTTPException(...) in the batch submission handler in evaluation.py and remove
the interpolated exception from the detail string.

In `@backend/app/crud/stt_evaluations/cron.py`:
- Around line 394-404: In process_completed_stt_batch's loop over
batch_responses replace the direct indexing response["custom_id"] with
response.get("custom_id", None) so a missing key yields None; keep the existing
try/except around int(raw_sample_id) (which will catch the None as TypeError)
and ensure you still log the same warning with batch_job.id and the
raw_sample_id value, increment failure_count and continue on error; reference
variables/functions: batch_responses, response, raw_sample_id, stt_sample_id,
batch_job.id, failure_count, and the surrounding for loop in cron.py.

In `@backend/app/services/stt_evaluations/batch_job.py`:
- Line 40: The direct conversion run_id = int(job_id) can raise ValueError for
non-numeric job_id; wrap the conversion in a try/except ValueError around the
run_id = int(job_id) statement, validate job_id first if you prefer, and in the
except block log the error (including the offending job_id), update the run/task
status to a failed/error state and exit/return the Celery task early so the run
record is updated instead of letting the task crash unhandled.
🧹 Nitpick comments (6)
backend/app/crud/stt_evaluations/dataset.py (1)

298-299: Misleading comment: limit is still applied, only the default was removed.

The comment says "removing limit" but the query still applies .limit(limit) on line 324. Consider rewording to clarify that the default was removed to force callers to be explicit about how many samples they want.

Suggested wording
-    # removing limit for now since we need all samples for batch job, can add pagination later if needed
-    limit: int,
+    limit: int,  # no default; callers must specify explicitly
backend/app/crud/stt_evaluations/batch.py (1)

107-107: Consider making max_workers configurable.

The hardcoded max_workers=10 is reasonable for typical workloads, but for large datasets with hundreds of samples, tuning may be needed. A constant or parameter would make this easier to adjust.

backend/app/services/stt_evaluations/batch_job.py (1)

15-23: Add type hint for task_instance and return type.

task_instance lacks a type annotation. Per coding guidelines, all function parameters and return values should have type hints. Also, the return type dict could be narrowed.

Suggested fix
+from typing import Any
+from celery import Task
+
 def execute_batch_submission(
     project_id: int,
     job_id: str,
     task_id: str,
-    task_instance,
+    task_instance: Task,
     organization_id: int,
     dataset_id: int,
     **kwargs,
-) -> dict:
+) -> dict[str, Any]:

As per coding guidelines, "Always add type hints to all function parameters and return values in Python code".

backend/app/tests/api/routes/test_stt_evaluation.py (1)

545-585: Test correctly updated for Celery offload flow.

The patch target, mock return value, and assertions align well with the route changes.

Consider using assert_called_once_with(...) to verify the correct function_path, job_id, organization_id, and dataset_id are passed to start_low_priority_job, which would catch wiring bugs.

backend/app/crud/stt_evaluations/cron.py (2)

371-374: Log format inconsistency: mixed = and : separators.

Lines 372–373 and 443–446 use key=value format (e.g., run_id={run.id}), while other functions in this file use key: {value} format (e.g., Line 128 run_id: {run.id}). Consider unifying to one style for consistent structured log parsing.

Also applies to: 443-446


427-433: Use modern SQLAlchemy 2.0 session.execute(insert(...)) instead of legacy bulk_insert_mappings().

Session.bulk_insert_mappings() is a legacy feature deprecated in SQLAlchemy 2.0 in favor of session.execute(insert(Model), list_of_dicts), which also provides better performance through insertmanyvalues batching. Additionally, if an exception occurs after chunks are flushed but before session.commit(), the partially flushed data remains in the session—the exception handler currently re-raises without explicit rollback, delegating recovery to the caller.

Replace with:

from sqlalchemy import insert

# Bulk insert in batches of 200
insert_batch_size = 200
for i in range(0, len(stt_result_rows), insert_batch_size):
    chunk = stt_result_rows[i : i + insert_batch_size]
    session.execute(insert(STTResult), chunk)
if stt_result_rows:
    session.commit()

Comment on lines +108 to 111
raise HTTPException(
status_code=500,
detail=f"Failed to queue batch submission: {e}",
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Avoid leaking internal exception details in the HTTP response.

detail=f"Failed to queue batch submission: {e}" exposes internal error information to the API client. Use a generic message instead; the detailed error is already logged on line 99.

Suggested fix
         raise HTTPException(
             status_code=500,
-            detail=f"Failed to queue batch submission: {e}",
+            detail="Failed to start evaluation. Please try again later.",
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise HTTPException(
status_code=500,
detail=f"Failed to queue batch submission: {e}",
)
raise HTTPException(
status_code=500,
detail="Failed to start evaluation. Please try again later.",
)
🤖 Prompt for AI Agents
In `@backend/app/api/routes/stt_evaluations/evaluation.py` around lines 108 - 111,
The HTTPException currently returns internal exception text via detail=f"Failed
to queue batch submission: {e}"; change this to a generic message (e.g.
detail="Failed to queue batch submission") so internal error details are not
leaked to clients, keep the status_code=500, and ensure the original exception
is still logged (the existing logger call earlier in the surrounding function in
evaluation.py should remain unchanged); locate the raise HTTPException(...) in
the batch submission handler in evaluation.py and remove the interpolated
exception from the detail string.

Comment on lines +394 to 404
for response in batch_responses:
raw_sample_id = response["custom_id"]
try:
sample_id = int(custom_id)
stt_sample_id = int(raw_sample_id)
except (ValueError, TypeError):
logger.warning(
f"[process_completed_stt_batch] Invalid custom_id | "
f"batch_job_id={batch_job.id}, custom_id={custom_id}"
f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}"
)
failed_count += 1
failure_count += 1
continue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

response["custom_id"] will raise KeyError if the key is missing.

If a batch response lacks "custom_id", this line throws an unhandled KeyError that propagates up and aborts processing of the entire batch. Use .get() with a fallback and handle the missing-key case alongside the existing ValueError/TypeError guard.

Suggested fix
         for response in batch_responses:
-            raw_sample_id = response["custom_id"]
+            raw_sample_id = response.get("custom_id")
             try:
                 stt_sample_id = int(raw_sample_id)
             except (ValueError, TypeError):
                 logger.warning(
                     f"[process_completed_stt_batch] Invalid custom_id | "
                     f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}"
                 )
                 failure_count += 1
                 continue

With this change, a None value from .get() will be caught by the TypeError handler below, logging a warning and continuing gracefully.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
for response in batch_responses:
raw_sample_id = response["custom_id"]
try:
sample_id = int(custom_id)
stt_sample_id = int(raw_sample_id)
except (ValueError, TypeError):
logger.warning(
f"[process_completed_stt_batch] Invalid custom_id | "
f"batch_job_id={batch_job.id}, custom_id={custom_id}"
f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}"
)
failed_count += 1
failure_count += 1
continue
for response in batch_responses:
raw_sample_id = response.get("custom_id")
try:
stt_sample_id = int(raw_sample_id)
except (ValueError, TypeError):
logger.warning(
f"[process_completed_stt_batch] Invalid custom_id | "
f"batch_job_id={batch_job.id}, custom_id={raw_sample_id}"
)
failure_count += 1
continue
🤖 Prompt for AI Agents
In `@backend/app/crud/stt_evaluations/cron.py` around lines 394 - 404, In
process_completed_stt_batch's loop over batch_responses replace the direct
indexing response["custom_id"] with response.get("custom_id", None) so a missing
key yields None; keep the existing try/except around int(raw_sample_id) (which
will catch the None as TypeError) and ensure you still log the same warning with
batch_job.id and the raw_sample_id value, increment failure_count and continue
on error; reference variables/functions: batch_responses, response,
raw_sample_id, stt_sample_id, batch_job.id, failure_count, and the surrounding
for loop in cron.py.

Returns:
dict: Result summary with batch job info
"""
run_id = int(job_id)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Unhandled ValueError if job_id is not a valid integer.

int(job_id) will raise ValueError for non-numeric strings. Since this runs in a Celery worker, an unhandled exception will cause the task to fail without updating the run status. Consider wrapping in a try/except or validating before conversion.

Suggested fix
-    run_id = int(job_id)
+    try:
+        run_id = int(job_id)
+    except ValueError:
+        logger.error(f"[execute_batch_submission] Invalid job_id: {job_id}")
+        return {"success": False, "error": f"Invalid job_id: {job_id}"}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
run_id = int(job_id)
try:
run_id = int(job_id)
except ValueError:
logger.error(f"[execute_batch_submission] Invalid job_id: {job_id}")
return {"success": False, "error": f"Invalid job_id: {job_id}"}
🤖 Prompt for AI Agents
In `@backend/app/services/stt_evaluations/batch_job.py` at line 40, The direct
conversion run_id = int(job_id) can raise ValueError for non-numeric job_id;
wrap the conversion in a try/except ValueError around the run_id = int(job_id)
statement, validate job_id first if you prefer, and in the except block log the
error (including the offending job_id), update the run/task status to a
failed/error state and exit/return the Celery task early so the run record is
updated instead of letting the task crash unhandled.

@vprashrex vprashrex self-requested a review February 16, 2026 03:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants